/**
 * 
 */
package com.app.store;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;

import com.app.db.DaoUtil;
import com.app.exception.DCacheException;
import com.app.model.ColumnInfo;
import com.app.model.Condition;
import com.app.model.IndexEntry;
import com.app.model.KVTableInfo;
import com.app.model.TableInfo;
import com.app.util.ClassTableUtil;
import com.app.util.StringUtils;



/**
 * @author lisong
 * @version 2013-5-24 上午11:58:45
 */
public class KeyValueStore extends AbstractMySqlStore {
	
	/**
	 * 在更新队列中主表数据的key值
	 */
	private final String TABLE_INDEX_SPLIT = "__";
	private final String COLUMN_KEY = "key_";
	private final String COLUMN_VALUE = "value_";
	
	KeyValueStore() {
		super(KV_STORE);
	}
	
	@Override
	public void initTables(List<TableInfo> tableInfos) throws DCacheException {

		if(init) {
			return;
		}
		init = true;
		Connection conn = null;
		try {
			conn = pool.getConnection();
			for(TableInfo tableInfo : tableInfos) {
				String tableName = tableInfo.getTableName();
				
				if(tableName.indexOf(TABLE_INDEX_SPLIT) != -1) {
					throw new DCacheException("table name cant contains: "+TABLE_INDEX_SPLIT);
				}
				createTable(tableInfo.getTableName(),conn);

				buildIndexTableSQL(tableName, tableInfo.getColInfos(),conn);
			}
		}catch(SQLException e) {
			e.printStackTrace();
			throw new DCacheException(e);
		} finally {
			DaoUtil.close(conn);
			conn = null;
		}
		
		cacheTables();
	}
	
	@Override
	public Map<String,TableInfo> cacheTables() throws DCacheException {
		PreparedStatement ps=null;
		Connection conn=null;
		String sql = "show tables";
		ResultSet rs = null;
		try{
			Map<String,TableInfo> tn = new ConcurrentHashMap<String, TableInfo>();
			conn=pool.getConnection();
			ps=conn.prepareStatement(sql);
			rs = ps.executeQuery();
			while (rs.next()) {
				String tableName = rs.getString(1);
				String indexName = null;
				
				int index = tableName.indexOf(TABLE_INDEX_SPLIT); 
				
				if(index != -1) {
					indexName = tableName;
					tableName = tableName.substring(0,index);
				}
				
				KVTableInfo tableInfo = (KVTableInfo) tables.get(tableName);
				
				if(tableInfo == null) {
					tableInfo = new KVTableInfo();
					tableInfo.getColInfos().clear();
					tableInfo.addAllColInfo(getColumnInfo(tableName,conn).values());
					for(ColumnInfo ci : tableInfo.getColInfos()) {
						if(ci.isKey()) {
							tableInfo.setKey(ci);
							break;
						}
					}
					tn.put(tableName, tableInfo);
				}
				if(indexName != null) {
					tableInfo.getTableIndexs().add(indexName);
				}
			}
			
			tables.putAll(tn);
			
			for(String k : tables.keySet()) {
				if(!tn.containsKey(k)) {
					tables.remove(k);
				}
			}
			
		} catch (Exception e) {
			logger.error(e.getMessage()+" sql:"+sql);
			e.printStackTrace();
			throw new DCacheException(e);
		}
		finally{
			DaoUtil.close(rs);
			DaoUtil.close(ps);
			DaoUtil.close(conn);
			conn = null;
			rs = null;
			ps = null;
		}
		return tables;
	}

	//建表
	private void  createTable(String tableName,Connection conn) throws DCacheException{
		
		StringBuilder sql=new StringBuilder();
		sql.append("CREATE TABLE IF NOT EXISTS `");
		sql.append(tableName);
		sql.append("`");
		sql.append(" (`");
		sql.append(COLUMN_KEY);
		sql.append("` varchar(100) NOT NULL,");
		sql.append(" `");
		sql.append(COLUMN_VALUE);
		sql.append("` blob,");
		sql.append(" `LastTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,");
		sql.append(" PRIMARY KEY (`");
		sql.append(COLUMN_KEY);
		sql.append("`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin");
		sql.append(" PARTITION BY KEY (");
		sql.append(COLUMN_KEY);
		sql.append(")");
		sql.append(" PARTITIONS ");
		sql.append(cfg.getTablePartition());
		PreparedStatement ps=null;
		try{
			ps=conn.prepareStatement(sql.toString());
			ps.executeUpdate();
			logger.info("create table :"+tableName);
		} catch (Exception e) {
			logger.error(e.getMessage()+" sql:"+sql);
			e.printStackTrace();
			throw new DCacheException(e);
		}
		finally{
			DaoUtil.close(ps);
			ps = null;
		}
	}
	
	//建立索引
	private void buildIndexTableSQL(String tableName, List<ColumnInfo> columns,Connection conn) throws DCacheException {

		Statement stmt = null;
		StringBuilder createSql = null;
		try {
			for(ColumnInfo ci : columns) {
				if(ci.isIndex()) {
					String tableIndexName =  genIndexTable(tableName, ci.getName());
					if(tableIndexName.length() > ClassTableUtil.MYSQL_TABLENAME_LIMIT) {
						throw new DCacheException("table name too long:"+tableName);
					}
					if(tableIndexName.endsWith(COLUMN_KEY) || tableIndexName.endsWith(COLUMN_VALUE)) {
						throw new DCacheException("table name cant end with "+COLUMN_KEY+" or "+COLUMN_VALUE+" tableName:"+tableName);
					}
					createSql = new StringBuilder();
					createSql.append("CREATE TABLE IF NOT EXISTS ");
					createSql.append(tableIndexName);
					createSql.append(" (");
					createSql.append(COLUMN_KEY);
					createSql.append(" bigint  NOT NULL, ");
					createSql.append(COLUMN_VALUE);
					createSql.append(" ");
					createSql.append(ci.getType());
					createSql.append(", PRIMARY KEY (");
					createSql.append(COLUMN_KEY);
					createSql.append("),INDEX value_index(");
					createSql.append(COLUMN_VALUE);
					createSql.append(") )  ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin PARTITION BY KEY (");
					createSql.append(COLUMN_KEY);
					createSql.append(")  PARTITIONS ");
					createSql.append(cfg.getTablePartition());
					stmt = conn.createStatement();
					stmt.executeUpdate(createSql.toString());
					logger.debug("create indextable :"+tableIndexName);
				}
			}
		} catch (Exception e) {
			logger.error(e.getMessage() + " sql:" + createSql.toString());
			e.printStackTrace();
			throw new DCacheException(e);
		} finally {
			DaoUtil.close(stmt);
			stmt = null;
		}
	
		
		
	}
	
	private String genIndexTable(String tableName,String indexAttr) {
		 StringBuilder sb=new StringBuilder();
		 sb.append(tableName);
		 sb.append(TABLE_INDEX_SPLIT);
		 sb.append(indexAttr);
		 return sb.toString();
	}
	
	 /**
	  * 生成insert索引表sql
	  * @param tableName
	  * @param indexAttr
	  * @return
	  */
	 private String genInsertIndexSql(String tableIndexName) {
		 StringBuilder sql=new StringBuilder();
		 sql.append("INSERT INTO ");
		 sql.append(tableIndexName);
		 sql.append(" (");
		 sql.append(COLUMN_KEY);
		 sql.append(",");
		 sql.append(COLUMN_VALUE);
		 sql.append(")");
		 sql.append(" VALUES(?,?) ON DUPLICATE KEY UPDATE ");
		 sql.append(COLUMN_VALUE);
		 sql.append("=?");
		 return sql.toString();
	 }
	 
	 
	 //数据库表批量插入
	 private void putMysqlBatch(String tableName,List<Long> keyList,List<String> valueList,List<String> indexAttr,List<List<String>> indexValueList,
			 Connection conn) throws DCacheException {
		 PreparedStatement ps=null;
		 LinkedHashMap<String, String> map = new LinkedHashMap<String,String>();
		 map.put(COLUMN_KEY, COLUMN_KEY);
		 map.put(COLUMN_VALUE, COLUMN_VALUE);
		String sql = genInsertTableSql(tableName,map);
		 try{
			 int tmp = 0;
			 ps=conn.prepareStatement(sql);
			 for(int i=0;i<keyList.size();i++) {
				 ps.setLong(1, keyList.get(i));
				 ps.setString(2, valueList.get(i));
				 ps.setString(3, valueList.get(i));
				 ps.addBatch();
				 tmp ++;
				 if(tmp % BATCH_EXECUTE_SIZE == 0) {
					 ps.executeBatch();
				 }
			 }
			 ps.executeBatch();
			 ps.close();
			 if(indexAttr.size()>0 ) {
				 this.putIndexBatch(tableName, keyList, indexAttr, indexValueList,  conn);
			 }
		 }catch (SQLException e) {
			e.printStackTrace();
			logger.error("tableName:"+tableName+ " "+valueList);
			throw new DCacheException(e);
		}finally{
			DaoUtil.close(ps);
			ps = null;
		}
	 }
	 
	 //索引表中添加新数据
	private void putIndex(String tableName, long id, List<String> indexAttr,
			List<String> indexValue,Connection conn)
			throws DCacheException {
		PreparedStatement ps = null;
		boolean close = conn == null;
		try {
			if(conn == null) {
				conn = pool.getConnection();
			}
			for (int i = 0; i < indexAttr.size(); i++) {
				String attr = indexAttr.get(i);
				String value = indexValue.get(i);
				String sql = genInsertIndexSql(genIndexTable(tableName, attr));
				ps = conn.prepareStatement(sql);
				ps.setLong(1, id);
				ps.setString(2, value);
				ps.setString(3, value);
				ps.executeUpdate();
				ps.close();
				ps = null;
			}
		} catch (SQLException e) {
			e.printStackTrace();
			throw new DCacheException(e);
		} finally {
			DaoUtil.close(ps);
			ps = null;
			if(close) {
				DaoUtil.close(conn);
				conn = null;
			}
		}
	}
	
	/**
	 * 向索引表中批量添加数据
	 * @param tableName
	 * @param keyList
	 * @param indexAttr  所有的key下有共用相同的几个attr 
	 * @param indexValueList 每一个attr下有多个value值(批量)
	 * @param indexType 每个attr对应一种类型
	 * @param conn
	 * @throws DCacheException
	 */
	private void putIndexBatch(String tableName, List<Long> keyList,List<String> indexAttr,List<List<String>> indexValueList,
			 Connection conn) throws DCacheException {
		 PreparedStatement ps=null;
		 try{
			 
			 for(int i=0;i<indexAttr.size();i++) {
				 String attr = indexAttr.get(i);
				 String sql = genInsertIndexSql(genIndexTable(tableName, attr));
				 ps = conn.prepareStatement(sql);
				 long key = keyList.get(i);
				 List<String> valueList = indexValueList.get(i);
				 int tmp = 0;
				 for(String value : valueList) {
					ps.setLong(1, key);
					ps.setString(2, value);
					ps.setString(3, value);
					ps.addBatch();
					tmp ++;
					 if(tmp % BATCH_EXECUTE_SIZE == 0) {
						 ps.executeBatch();
					 }
				 }
				 ps.executeBatch();
				 ps.close();
			 }
		 }catch(SQLException e) {
			 e.printStackTrace();
			 throw new DCacheException(e);
		 }finally {
			 DaoUtil.close(ps);
			 ps = null;
		 }
	 }
	
	private String genDeleteTableIndexSql(String tableName,long id) {
		StringBuilder sql=new StringBuilder();
		 sql.append("DELETE FROM ");
		 sql.append(tableName);
		 sql.append(" WHERE ");
		 sql.append(COLUMN_KEY);
		 sql.append("=");
		 sql.append(id);
		 return sql.toString();
	}
	
	private String genDeleteTableIndexBatchSql(String tableName) {
		StringBuilder sql=new StringBuilder();
		 sql.append("DELETE FROM ");
		 sql.append(tableName);
		 sql.append(" WHERE ");
		 sql.append(COLUMN_KEY);
		 sql.append("=");
		 sql.append("?");
		 return sql.toString();
	}
	
	 //删除索引中的数据
	 private void deleteIndex(long id,List<String> indexTableName) throws DCacheException {
		Statement ps = null;
		Connection conn = null;
		 try{
			 conn = pool.getConnection();
			 
			 boolean commit = conn.getAutoCommit();
			 conn.setAutoCommit(false);
			 ps = conn.createStatement();
			 for(String idxname:indexTableName) {
				ps.addBatch(genDeleteTableIndexSql(idxname, id));
			 }
			 ps.executeBatch();
			 conn.setAutoCommit(commit);
		 }catch (Exception e) {
			 e.printStackTrace();
			 throw new DCacheException(e);
		 }
		 finally{
			 DaoUtil.close(ps);
			 DaoUtil.close(conn);
			 conn = null;
			 ps = null;
		 }
	 }
	 
	 //批量删除索引中记录
	 private void deleteIndexBatch(List<String> tableIndexName,List<Long> keyList) throws DCacheException {
		 PreparedStatement ps = null;
		 Connection conn = null;
		 try{
			 conn = pool.getConnection();
			 for(String indexName:tableIndexName) {
				 String sql = genDeleteTableIndexBatchSql(indexName);
				 ps = conn.prepareStatement(sql);
				 int tmp = 0;
				 for(long key:keyList) {
					ps.setLong(1, key);
					ps.addBatch();
					tmp ++;
					if(tmp % BATCH_EXECUTE_SIZE == 0) {
						ps.executeBatch();
					}
				 }
				ps.executeBatch();
				DaoUtil.close(ps);
				ps = null;
			 }
		 } catch (Exception e) {
			 e.printStackTrace();
			 throw new DCacheException(e);
		 }
		 finally{
			 DaoUtil.close(ps);
			 DaoUtil.close(conn);
			 conn = null;
			 ps = null;
		 }
	 }
	 
	@Override
	public int putEntity(String tableName, long id, String value,
			List<String> indexAttr, List<String> indexValue, LinkedHashMap<String, String> changed, boolean flushdb)
			throws DCacheException {
		int ret = -1;
		try {
			if(flushdb) {
				LinkedHashMap<String, String> tmp = new LinkedHashMap<String,String>();
				tmp.put(COLUMN_KEY, String.valueOf(id));
				tmp.put(COLUMN_VALUE, value);
				this.putDbValue(tableName,tmp);
				if(indexAttr.size()>0 && indexAttr.size() == indexValue.size() ) {
					this.putIndex(tableName, id, indexAttr, indexValue,null);
				}
			}
			else {
				//放入更新队列
				putUpdateQueue(tableName, id, value, indexAttr, indexValue, changed);
			}
			String memKey=this.genMemcKey(tableName, id);
			boolean tmp = client.set(memKey, value,DEFAULT_EXPIRE);
			if(!tmp) {
				logger.error("数据存储到缓存失败");
			}
			ret = 1;
			putTimes.getAndAdd(1);
			if(logger.isDebugEnabled()) {
				AtomicLong ato = new AtomicLong();
				AtomicLong old = put.putIfAbsent(tableName, ato);
				if(old != null) {
					ato = old;
				}
				ato.addAndGet(1);
			}
		} catch (Exception e) {
			e.printStackTrace();
			throw new DCacheException(e);
		}
		return ret;
	}
	
	@Override
	protected List<String> converResult2Json(String tableName,ResultSet rs) throws DCacheException {
		List<String> lst = new ArrayList<String>();
		try {
			while(rs.next()){
				byte[] tmp = rs.getBytes(COLUMN_VALUE);
				if(tmp != null) {
					lst.add(new String(tmp, "utf-8"));
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
			throw new DCacheException(e);
		}
		
		return lst;
	}
	@Override
	public void delEntity(String tableName, long id,boolean flushdb)
			throws DCacheException {
		super.delEntity(tableName, id, flushdb);
		if(flushdb && ((KVTableInfo)tables.get(tableName)).getTableIndexs().size() > 0) {
			//删除索引
			deleteIndex(id, ((KVTableInfo)tables.get(tableName)).getTableIndexs());
		}
	}
	
	@Override
	public void putEntityList(String tableName, List<Long> keyList, List<String> valueList, List<String> indexAttr,
            List<List<String>> indexValueList,List<LinkedHashMap<String,String>> changedList,boolean flushdb)
			throws DCacheException {
		Connection conn=null;
		try{
			conn=pool.getConnection();
			if(flushdb) {
				this.putMysqlBatch(tableName, keyList, valueList,indexAttr,indexValueList,conn);
			}
			else {
				//放到缓存
				putUpdateQueue(tableName, keyList, valueList, indexAttr, indexValueList, changedList);
			}
			for(int i=0;i<keyList.size();i++) {
				String memKey=this.genMemcKey(tableName, keyList.get(i));
				if(!client.set(memKey, valueList.get(i),DEFAULT_EXPIRE)) {
					logger.error("数据存储到缓存失败");
				}
			}
			putListTimes.getAndAdd(1);
			if(logger.isDebugEnabled()) {
				AtomicLong ato = new AtomicLong();
				AtomicLong old = put.putIfAbsent(tableName, ato);
				if(old != null) {
					ato = old;
				}
				ato.addAndGet(keyList.size());
			}
		}catch(Exception e) {
			e.printStackTrace();
			logger.error(e.getMessage(), e);
			throw new DCacheException(e);
		}finally{
			DaoUtil.close(conn);
			conn = null;
		}
	}
	
	@Override
	public void delEntityList(String tableName, List<Long> keyList,boolean flushdb) throws DCacheException {
		super.delEntityList(tableName, keyList, flushdb);
		if(flushdb && ((KVTableInfo)tables.get(tableName)).getTableIndexs().size() > 0) {
			 this.deleteIndexBatch(((KVTableInfo)tables.get(tableName)).getTableIndexs(), keyList);
		 }
	}

	@Override
	public void putMultiEntityList(List<String> tableNameList, List<Long> keyList, List<String> valueList,  
    		List<List<String>> indexAttrList, List<List<String>> indexValueList,List<LinkedHashMap<String,String>> changedList,boolean flushdb)
			throws DCacheException {
		Map<String,List<Long>> keyMap=new HashMap<String,List<Long>>();
		Map<String,List<String>> valueMap=new HashMap<String,List<String>>();
		Map<String,List<String>> indexAttrMap=new HashMap<String,List<String>>();
		Map<String,List<List<String>>> indexValueMap=new HashMap<String,List<List<String>>> ();
		Map<String,List<LinkedHashMap<String,String>>> changedMap=new HashMap<String,List<LinkedHashMap<String,String>>> ();
		for(int i=0;i<tableNameList.size();i++) {
			if(!keyMap.containsKey(tableNameList.get(i))) {
				List<Long> keylst=new ArrayList<Long> ();
				keyMap.put(tableNameList.get(i), keylst);
			}
			if(!valueMap.containsKey(tableNameList.get(i))) {
				List<String> valuelst=new ArrayList<String>();
				valueMap.put(tableNameList.get(i), valuelst);
			}
			if(!indexAttrMap.containsKey(tableNameList.get(i))) {
				indexAttrMap.put(tableNameList.get(i), indexAttrList.get(i));
			}
			if(!indexValueMap.containsKey(tableNameList.get(i))) {
				List<List<String>> lst=new ArrayList<List<String>>();
				indexValueMap.put(tableNameList.get(i), lst);
			}
			if(!changedMap.containsKey(tableNameList.get(i))) {
				List<LinkedHashMap<String,String>> mp=new ArrayList<LinkedHashMap<String,String>>();
				changedMap.put(tableNameList.get(i), mp);
			}
			keyMap.get(tableNameList.get(i)).add(keyList.get(i));
			valueMap.get(tableNameList.get(i)).add(valueList.get(i));
			indexValueMap.get(tableNameList.get(i)).add(indexValueList.get(i));
			changedMap.get(tableNameList.get(i)).add(changedList.get(i));
		}
		
		Set<String> classNames=keyMap.keySet();
		if(classNames!=null) {
			Iterator<String> it = classNames.iterator();
			while(it.hasNext()) {
				String className= it.next();
				if(keyMap.get(className).size()>0 && valueMap.get(className).size()>0) {
					this.putEntityList(className, keyMap.get(className), valueMap.get(className), indexAttrMap.get(className), indexValueMap.get(className), changedMap.get(className),flushdb);
				}
			}
		}
	}

	@Override
	public List<String> queryEntityList(String tableName, Condition condition)
			throws DCacheException {
		String attr = null;
		if(StringUtils.isNullOrEmpty(condition.getFilterAttr())) {
			attr = condition.getOrderAttr();
		}else {
			attr = condition.getFilterAttr();
		}
		return super.queryEntityList(genIndexTable(tableName, attr), condition);
	}
	
	@Override
	protected String genQueryValueName(Condition condition) {
		return COLUMN_VALUE;
	}
	
	@Override
	protected String genQueryKeyName(String tableName) {
		return COLUMN_KEY;
	}
	
	@Override
	public long countWithCondition(String tableName, Condition condition)
			throws DCacheException {
		return super.countWithCondition(genIndexTable(tableName, condition.getFilterAttr()), condition);
	}

	@Override
	public List<IndexEntry> queryIndexList(String tableName, Condition condition)
			throws DCacheException {
		String attr = null;
		if(StringUtils.isNullOrEmpty(condition.getFilterAttr())) {
			attr = condition.getOrderAttr();
		}else {
			attr = condition.getFilterAttr();
		}
		return super.queryIndexList(genIndexTable(tableName, attr), condition);
	}
	
	
	public void preLoad(String afterDate,String preloadPwd) throws DCacheException {
		if(!cfg.getPreloadPwd().equals(afterDate)) {
			throw new DCacheException("加载秘密错误");
		}else {
			Connection conn=null;
			PreparedStatement ps=null;
			ResultSet rs=null;
			Map<String,Long> tables = new HashMap<String,Long>();
			long totalRecord = 0;
			try{
				
				conn=pool.getConnection();
				String sql="show tables";
				ps=conn.prepareStatement(sql);
				
				rs=ps.executeQuery();
				while(rs.next()) {
					String tablename = rs.getString(1);
					if(tablename.contains(TABLE_INDEX_SPLIT)) {
						continue;
					}
					tables.put(tablename,0l);
				}
				DaoUtil.close(rs);
				DaoUtil.close(ps);
				rs = null;
				ps = null;
				for(String name : tables.keySet()) {
					sql = "select count("+COLUMN_KEY+") from " + name;
					ps = conn.prepareStatement(sql);
					rs = ps.executeQuery();
					if(rs.next()) {
						long ret = rs.getLong(1);
						totalRecord += ret;
						tables.put(name, ret);
					}
					DaoUtil.close(rs);
					DaoUtil.close(ps);
					rs = null;
					ps = null;
				}
			}catch(Exception e) {
				e.printStackTrace();
				logger.error(e.getMessage(), e);
				throw new DCacheException(e);
			}finally{
				DaoUtil.close(rs);
				DaoUtil.close(ps);
				DaoUtil.close(conn);
				conn = null;
				rs = null;
				ps = null;
			}

			int thread = 150; 
			Map<String,String> tablesql = new HashMap<String,String>();//
			for(Entry<String, Long> e : tables.entrySet()) {
				int tmp = (int)(thread * e.getValue()/(totalRecord*1f));
				tmp = (tmp == 0 ? 1 : tmp);//分配给该表的线程数
				int split = (int)(e.getValue() / tmp);//limit大小
				StringBuilder sql=new StringBuilder();
				sql.append( "select "+COLUMN_KEY+","+COLUMN_VALUE+" from " );
				sql.append(e.getKey() );
				sql.append(" where LastTime > '");
				sql.append(afterDate);
				sql.append("' limit " ) ; 
				for(int i=0;i<tmp;i++) {
					String str = sql.toString() + (i*split)+","+split;
					tablesql.put(str, e.getKey());
				}
			}
			System.out.println("tablesql size:"+tablesql.size());
			ExecutorService executorService = Executors.newFixedThreadPool(thread,new LoadDataThreadFactory());
			CompletionService<Long> pool = new ExecutorCompletionService<Long>(executorService); 
			for(Entry<String, String> e : tablesql.entrySet()) {
				pool.submit(new LoadData(e.getValue(),e.getKey()));
			}
			long spendTime = 0;
			for(int i=0;i<tablesql.size();i++) {
				try {
					spendTime += pool.take().get();
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			System.out.println("preLoad 执行完毕--------花费时间:"+spendTime);
			executorService.shutdown();
		}
		
	}
	
	private class LoadData implements Callable<Long> {
		String tableName;
		String sql;
		
		public LoadData(String tableName,String sql) {
			this.tableName = tableName;
			this.sql = sql;
		}
		
		private long loadData(String tableName) {
			Connection conn=null;
			PreparedStatement ps=null;
			ResultSet rs=null;
			long begin = System.currentTimeMillis();
			int def = 5 * 60 * 1000;
			try {
				
				conn=pool.getConnection();
				ps = conn.prepareStatement(sql);
				rs = ps.executeQuery();
				while(rs.next()) {
					client.set(genMemcKey(tableName, new String(rs.getBytes(1),"utf-8")), new String(rs.getBytes(2),"utf-8"),DEFAULT_EXPIRE);
					if((System.currentTimeMillis() - begin) % def == 0) {
						if(memcItemPerc() > 0.8f) {
							break;
						}
					}
				}
			}catch (Exception e) {
				logger.error("异常:sql:"+sql,e);
				e.printStackTrace();
			} finally {
				DaoUtil.close(rs);
				DaoUtil.close(ps);
				DaoUtil.close(conn);
				conn = null;
				rs = null;
				ps = null;
			}
			return System.currentTimeMillis() - begin;
		}
		/* (non-Javadoc)
		 * @see java.util.concurrent.Callable#call()
		 */
		
		public Long call() throws Exception {
			long ret = 0;
			System.out.println(sql+" is run");
			ret = loadData(tableName);
			System.out.println(sql+"  run end");
			return ret;
		}
		
	}
	
	private class LoadDataThreadFactory implements ThreadFactory {

		
		public Thread newThread(Runnable r) {
			Thread t = new Thread(r);
			t.setPriority(Thread.MAX_PRIORITY);//最高优先级
			return t;
		}
		
	}
	
	private float memcItemPerc() throws Exception {
		float ret = 0f;
//		Map<InetSocketAddress,Map<String,String>> stats = client.getStats();
		
//		Map<String,String> map=new HashMap<String,String>();
//		if(stats!=null)
//		{
//			String[] str=stats.split("\n");
//			for(String line:str) {
//				if(line.length()>0 && line.split(":").length==2) {
//					map.put(line.split(":")[0].replace("\t", "").trim(), line.split(":")[1].replace("\t", "").trim());
//				}
//			}
//		}
//		if(NumberUtils.toFloat(map.get("limit_maxbytes")) > 0f) {
//			ret = NumberUtils.toFloat(map.get("bytes")) / NumberUtils.toFloat(map.get("limit_maxbytes"));
//		}
		return ret;
	}
	
	public static void main(String[] args) {
//		List<String> keyList=new ArrayList<String>();
//		List<String> valueList=new ArrayList<String>();
//		List<String> indexAttr1=new ArrayList<String>();
//		List<String> indexAttr2=new ArrayList<String>();
//		List<String> indexType1=new ArrayList<String>();
//		List<String> indexType2=new ArrayList<String>();
//		List<String> indexValue1=new ArrayList<String>();
//		List<String> indexValue2=new ArrayList<String>();
//		List<String> indexValue3=new ArrayList<String>();
//		List<String> classNameList=new ArrayList<String>();
//		List<List<String>> indexValueList=new ArrayList<List<String>>();
//		List<List<String>> indexAttrList=new ArrayList<List<String>>();
//		List<List<String>> indexTypeList=new ArrayList<List<String>>();
//		
//		
//		indexAttr1.add("test1_index1");
//		indexAttr1.add("test1_index2");
//		indexAttrList.add(indexAttr1);
//		indexAttrList.add(indexAttr1);
//		indexAttr2.add("test2_index1");
//		indexAttrList.add(indexAttr2);
//		
//		indexType1.add("string");
//		indexType1.add("integer");
//		indexTypeList.add(indexType1);
//		indexTypeList.add(indexType1);
//		indexType2.add("string");
//		indexTypeList.add(indexType2);
//		
//		indexValue1.add("test1-1");
//		indexValue1.add("11");
//		indexValueList.add(indexValue1);
//		indexValue2.add("test1-2");
//		indexValue2.add("22");
//		indexValueList.add(indexValue2);
//		indexValue3.add("test2-3");
//		indexValue3.add("333");
//		indexValueList.add(indexValue3);
//		
//		classNameList.add("test1");
//		classNameList.add("test1");
//		classNameList.add("test2");
//		
//		
//		keyList.add("1");
//		keyList.add("2");
//		keyList.add("3");
//		
//		valueList.add("11a");
//		valueList.add("22b");
//		valueList.add("33c");
		
//		List<CASValue> caslist=new ArrayList<CASValue>();
//		CASValue cas1=new CASValue();
//		cas1.setCas(85129198);
//		cas1.setValue("test-11");
//		caslist.add(cas1);
//		CASValue cas2=new CASValue();
//		cas2.setCas(69967405);
//		cas2.setValue("test-22");
//		caslist.add(cas2);
//		CASValue cas3=new CASValue();
//		cas3.setCas(71215792);
//		cas3.setValue("test-33");
//		caslist.add(cas3);
//		try {
//			System.out.println(classNameList);
//			System.out.println(keyList);
//			System.out.println(valueList);
//			System.out.println(indexAttrList);
//			System.out.println(indexTypeList);
//			System.out.println(caslist);
			
//			dcacheservice.putEntityList("test1", keyList, valueList, indexAttr, indexValueList, indexType);
//			System.out.println(dcacheservice.getEntity("test1", "2"));
//			valuemap=dcacheservice.getEntityList("test1", keyList);
//			System.out.println(dcacheservice.getEntityList("test1", keyList));
//			dcacheservice.putEntity("test1", "1", "111", indexAttr, indexValue1, indexType);
//			System.out.println(dcacheservice.getEntityListCAS("test1", keyList));
//			System.out.println(dcacheservice.getEntityCAS("test1", "1"));
//			dcacheservice.delEntityList("test1", keyList, indexAttr);
			
//			dcacheservice.putEntityCAS("test1", "1", cas1, indexAttr, indexValue1, indexType);
//			dcacheservice.putEntityListCAS("test1", keyList, caslist, indexAttr, indexValueList, indexType);
			
//			dcacheservice.putMultiEntityList(classNameList, keyList, valueList, indexAttrList, indexValueList, indexTypeList);
//		} catch (DCacheException e) {
//			e.printStackTrace();
//		}
		
		try {
//			System.out.println(service.lockRecord("qq"));
//			System.out.println(service.lockRecord("qq"));
//			System.out.println(service.lockRecord("33"));
//			System.out.println(service.lockRecord("33"));
			
//			System.out.println(service.lockRelease("qq"));
//			System.out.println(service.lockRelease("qq"));
			System.out.println(System.currentTimeMillis());
		} catch (DCacheException e) {
			e.printStackTrace();
		}
	}

	public void test() throws DCacheException {
		
		Connection conn=null;
		PreparedStatement ps=null;
		ResultSet rs=null;
		try{
			
			conn=pool.getConnection();
			String sql="show tables";
			ps=conn.prepareStatement(sql);
			rs=ps.executeQuery();
			while(rs.next()) {
				String tablename = rs.getString(1);
				sql = "select "+COLUMN_KEY+" from "+tablename+" where LastTime >";
			}
		}catch (SQLException e) {
		}catch(Exception e) {
			e.printStackTrace();
			logger.error(e.getMessage(), e);
			throw new DCacheException(e);
		}finally{
			DaoUtil.close(rs);
			DaoUtil.close(ps);
			DaoUtil.close(conn);
			conn = null;
			rs = null;
			ps = null;
		}
	}

	private void putUpdateQueue(String tableName, long id, String value,
			List<String> indexAttr, List<String> indexValue,
			LinkedHashMap<String, String> changed) throws DCacheException {
		
		Map<String,String> map = new HashMap<String, String>();
		map.put(COLUMN_VALUE, value);//主表数据
		map.put(COLUMN_KEY, String.valueOf(id));//主表数据
		//索引数据
		for(Entry<String, String> e : changed.entrySet()) {
			if(indexAttr.contains(e.getKey())) {
				map.put(e.getKey(), e.getValue().toString());
			}
		}
		putUpdateQueue(tableName, id, map);
	}

	private void putUpdateQueue(String tableName, List<Long> keyList,
			List<String> valueList, List<String> indexAttr,
			List<List<String>> indexValueList,
			List<LinkedHashMap<String, String>> changedList) throws DCacheException {
		List<Map<String, String>> lst = new ArrayList<Map<String,String>>();
		for(int i=0;i<valueList.size();i++) {
			String value = valueList.get(i);
			Map<String,String> map = new HashMap<String, String>();
			map.put(COLUMN_VALUE, value);//主表数据
			map.put(COLUMN_KEY, String.valueOf(keyList.get(i)));//主表数据
			//索引数据
			for(Entry<String, String> e : changedList.get(i).entrySet()) {
				if(indexAttr.contains(e.getKey())) {
					map.put(e.getKey(), e.getValue().toString());
				}
			}
			lst.add(map);
		}
		putUpdateQueue(tableName, keyList, lst);
	}
	
	@Override
	public void putFromUpdateQueue(String tableName,
			LinkedHashMap<String, String> changed,Connection conn) throws DCacheException {
		LinkedHashMap<String,String> map = new LinkedHashMap<String, String>();
		String id = changed.remove(COLUMN_KEY);
		map.put(COLUMN_KEY, id);
		map.put(COLUMN_VALUE, changed.remove(COLUMN_VALUE));
		List<String> indexAttr = new ArrayList<String>();
		List<String> indexValue = new ArrayList<String>();
		;
		for(Entry<String, String> e : changed.entrySet()) {
			indexAttr.add(e.getKey());
			indexValue.add(e.getValue());
		}
		super.putFromUpdateQueue(tableName, map,conn);
		//更新索引
		putIndex(tableName, Long.parseLong(id), indexAttr, indexValue,conn);
	}
}
